Alpha Zero版五子棋实现

AlphaGo-Zero五子棋实现笔记

代码在最下面,假如有帮助请star ☺☺☺

五子棋与围棋算法实现基本一致 但是,两个游戏不管是从计算落子的复杂度,蒙特卡洛模拟时性能要求,棋局走势的多变性和模型的收敛难度上完全不是一个级别的,在此向google致敬

对弈1300局后效果

对弈2400局后效果

阅读和实现论文笔记

  • AlphaGo Zero(下称Zero)特点:
    • 不同于AlphaGo Lee,Zero训练方式是self-play,并且没有使用任何已有数据用以监督学习
    • 特征抽取也相对简单
    • 仅使用唯一神经网络(下称net),包括前端特征网络以及后端的策略和价值网络
    • 蒙特卡洛树搜索(下称mcts)逻辑相对简单

Zero中强化学习

Zero的net输入为历史盘面和当前盘面特征,二进制格式,即0和1,输出策略p和价值v,其中p为在棋盘上每个点落子的概率,v为评估当前盘面下当前玩家胜利的概率。

训练时,模型使用self-play方式,每一次落子都将使用mcts与net结合决定,mcts+net这对加强版策略玩家将会根据self-play的结果加速收敛。

mcts+net具体流程:1、选择子节点,2、展开子节点,选取局部最优解,3、更新父节点,4、循环1,2,3后落子。

每一个节点会保存节点访问次数 - N,net给出概率 - P,平均选择价值 - Q

模拟开始,从根节点通过公式PUCT选取子节点最大值递归遍历到叶节点, 然后按原路径返回更新每个节点的N和Q,直到分出胜负或者平局。

模拟结束,选择N最大的子节点作为下一个落子

在每一步落子时要记录当前盘面的状态,mcts+net对这一步做出的策略pi以及当前玩家,用以训练net

细节

  • 温度:在self-play的前30步中使温度等于1,其他时候包括评估模型变现时使用一个极小值
  • 根节点使用Dirichlet初始化P,使得可以尝试更多的落子选择
  • 每落子一次,将以选择子节点更新为新的根节点
  • 输入net的特征分别为8张仅有黑子和8张仅有白子已经当前玩家落子的棋盘
  • 损失函数使用mean-squared和cross-entropy(公式如下图)
  • 训练时对输入特征使用旋转和反转方式上采样
  • 在mcts中,获取的每一步概率需要经过softmax处理
  • 在mcts中,每次更新父节点时注意value的正负值
  • 五子棋和围棋不同的是判断棋局正负时只需要关注最后落子方
  • 神经网络在Zero中并不起到决定性作用,就算换成概率平均分布经过多次模拟之后也可以实现对弈

代码解释

五子棋盘构建

class Board(object):
    def __init__(self,
                 size=SIZE,
                 hist_num=HISTORY,
                 c_action=-1,
                 player=BLACK):

        self.size = size
        self.c_action = c_action
        self.hist_num = hist_num
        self.valid_moves = list(range(size**2))
        self.invalid_moves = []
        self.board = np.zeros([size, size])
        self.c_player = player
        self.players = {"black": BLACK, "white": WHITE}

        # BLACK -> 0 | WHITE -> 1
        self.history = [np.zeros((hist_num, size, size)),
                        np.zeros((hist_num, size, size))]

判断棋局有没有以一方胜利结束 原理:以最后落子为中心判断横竖斜方向有没有同样颜色的五颗棋子

def is_game_over(self, player=None):
    x, y = self.c_action // self.size, self.c_action % self.size
    if player is None:
        player = self.c_player

    for i in range(x - 4, x + 5):
        if self._get_piece(i, y) == self._get_piece(i + 1, y) == self._get_piece(i + 2, y) == self._get_piece(i + 3, y) == self._get_piece(i + 4, y) == player:
            return True

    for j in range(y - 4, y + 5):
        if self._get_piece(x, j) == self._get_piece(x, j + 1) == self._get_piece(x, j + 2) == self._get_piece(x, j + 3) == self._get_piece(x, j + 4) == player:
            return True

    j = y - 4
    for i in range(x - 4, x + 5):
        if self._get_piece(i, j) == self._get_piece(i + 1, j + 1) == self._get_piece(i + 2, j + 2) == self._get_piece(i + 3, j + 3) == self._get_piece(i + 4, j + 4) == player:
            return True
        j += 1

    i = x + 4
    for j in range(y - 4, y + 5):
        if self._get_piece(i, j) == self._get_piece(i - 1, j + 1) == self._get_piece(i - 2, j + 2) == self._get_piece(i - 3, j + 3) == self._get_piece(i - 4, j + 4) == player:
            return True
        i -= 1

    return False

获取当前棋局特征 包括历史盘面,当前棋手和最后落子

def gen_state(self):
    to_action = np.zeros((1, self.size, self.size))
    to_action[0][self.c_action // self.size,
                 self.c_action % self.size] = 1.
    to_play = np.full((1, self.size, self.size), self.c_player - BLACK)
    state = np.concatenate(self.history + [to_play, to_action], axis=0)

    return state

节点

mcts中树节点,解释参考注释和代码

class TreeNode(object):
    def __init__(self,
                 action=None,
                 props=None,
                 parent=None):

        self.parent = parent
        self.action = action
        self.children = []

        self.N = 0  # visit count
        self.Q = .0  # mean action value
        self.W = .0  # total action value
        self.P = props  # prior probability

    def is_leaf(self):
        return len(self.children) == 0

    def select_child(self):
        index = np.argmax(np.asarray([c.uct() for c in self.children]))
        return self.children[index]

    def uct(self):
        return self.Q + self.P * CPUCT * (np.sqrt(self.parent.N) / (1 + self.N))

    def expand_node(self, props):
        self.children = [TreeNode(action=action, props=p, parent=self)
                         for action, p in enumerate(props) if p > 0.]

    def backup(self, v):
        self.N += 1
        self.W += v
        self.Q = self.W / self.N

mcts

搜索 1、从根节点开始往下搜索直到叶节点 2、将当前棋面使用神经网络给出落子概率和价值评估 3、然后从叶节点返回到根节点一路更新

def search(self, borad, node, temperature=.001):
    self.borad = borad
    self.root = node

    for _ in range(self.ms_num):
        node = self.root
        borad = self.borad.clone()

        while not node.is_leaf():
            node = node.select_child()
            borad.move(node.action)
            borad.trigger()

        # be carefull - opponent state
        value, props = self.net(
            to_tensor(borad.gen_state(), unsqueeze=True))
        value = to_numpy(value, USECUDA)[0]
        props = np.exp(to_numpy(props, USECUDA))

        # add dirichlet noise for root node
        if node.parent is None:
            props = self.dirichlet_noise(props)

        # normalize
        props[borad.invalid_moves] = 0.
        total_p = np.sum(props)
        if total_p > 0:
            props /= total_p

        # winner, draw or continue
        if borad.is_draw():
            value = 0.
        else:
            done = borad.is_game_over(player=borad.last_player)
            if done:
                value = -1.
            else:
                node.expand_node(props)

        while node is not None:
            value = -value
            node.backup(value)
            node = node.parent

神经网络

特征网络

class Feature(nn.Module):
  def __init__(self,
               ind=IND,
               outd=RES_BLOCK_FILLTERS):

    super().__init__()

    self.layer = nn.Sequential(
        nn.Conv2d(ind, outd,
                  stride=1,
                  kernel_size=3,
                  padding=1,
                  bias=False),
        nn.BatchNorm2d(outd),
        nn.ReLU(),
    )
    self.encodes = nn.ModuleList([ResBlockNet() for _ in range(BLOCKS)])

  def forward(self, x):
    x = self.layer(x)
    for enc in self.encodes:
      x = enc(x)
    return x

策略网络

class Policy(nn.Module):
  def __init__(self,
               ind=RES_BLOCK_FILLTERS,
               outd=OUTD,
               kernels=2):

    super().__init__()

    self.out = outd * kernels

    self.conv = nn.Sequential(
        nn.Conv2d(ind, kernels, kernel_size=1),
        nn.BatchNorm2d(kernels),
        nn.ReLU(),
    )

    self.linear = nn.Linear(kernels * outd, outd)
    self.linear.weight.data.uniform_(-.1, .1)

  def forward(self, x):
    x = self.conv(x)
    x = x.view(-1, self.out)
    x = self.linear(x)

    return F.log_softmax(x, dim=-1)

价值网络

class Value(nn.Module):
  def __init__(self,
               ind=RES_BLOCK_FILLTERS,
               outd=OUTD,
               hsz=256,
               kernels=1):
    super().__init__()

    self.outd = outd

    self.conv = nn.Sequential(
        nn.Conv2d(ind, kernels, kernel_size=1),
        nn.BatchNorm2d(kernels),
        nn.ReLU(),
    )

    self.linear = nn.Sequential(
        nn.Linear(outd, hsz),
        nn.ReLU(),
        nn.Linear(hsz, 1),
        nn.Tanh(),
    )

    self._reset_parameters()

  def forward(self, x):
    x = self.conv(x)
    x = x.view(-1, self.outd)

    return self.linear(x)

  def _reset_parameters(self):
    for layer in self.modules():
      if type(layer) == nn.Linear:
        layer.weight.data.uniform_(-.1, .1)

整体网络

class Net(nn.Module):
  def __init__(self):
    super().__init__()

    self.feat = Feature()
    self.value = Value()
    self.policy = Policy()

  def forward(self, x):
    feats = self.feat(x)
    winners = self.value(feats)
    props = self.policy(feats)

    return winners, props

  def save_model(self, path="model.pt"):
    torch.save(self.state_dict(), path)

  def load_model(self, path="model.pt", cuda=True):
    self.load_state_dict(torch.load(path))

    if cuda:
      self.cuda()
    else:
      self.cpu()

采样

def sample(self, datas):
    for state, pi, reward in datas:
        c_state = state.copy()
        c_pi = pi.copy()
        for i in range(4):
            c_state = np.array([np.rot90(s, i) for s in c_state])
            c_pi = np.rot90(c_pi.reshape(SIZE, SIZE), i)
            self.sample_data.append([c_state, c_pi.flatten(), reward])

            c_state = np.array([np.fliplr(s) for s in c_state])
            c_pi = np.fliplr(c_pi)
            self.sample_data.append([c_state, c_pi.flatten(), reward])

    return len(datas)

代码

alpha-zero

Nevermore Written by:

步步生姿,空锁满庭花雨。胜将娇花比。